// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

#include "io/buffered_reader.h"

#include <algorithm>
#include <sstream>

#include "common/config.h"
#include "olap/olap_define.h"
#include "util/bit_util.h"

namespace doris {

// buffered reader
BufferedReader::BufferedReader(RuntimeProfile* profile, FileReader* reader, int64_t buffer_size)
        : _profile(profile),
          _reader(reader),
          _buffer_size(buffer_size),
          _buffer_offset(0),
          _buffer_limit(0),
          _cur_offset(0) {
    if (_buffer_size == -1L) {
        _buffer_size = config::remote_storage_read_buffer_mb * 1024 * 1024;
    }
    _buffer = new char[_buffer_size];
    // set the _cur_offset of this reader as same as the inner reader's,
    // to make sure the buffer reader will start to read at right position.
    _reader->tell(&_cur_offset);
}

BufferedReader::~BufferedReader() {
    close();
}

Status BufferedReader::open() {
    if (!_reader) {
        return Status::InternalError("Open buffered reader failed, reader is null");
    }

    // the macro ADD_XXX is idempotent.
    // So although each scanner calls the ADD_XXX method, they all use the same counters.
    _read_timer = ADD_TIMER(_profile, "FileReadTime");
    _remote_read_timer = ADD_TIMER(_profile, "FileRemoteReadTime");
    _read_counter = ADD_COUNTER(_profile, "FileReadCalls", TUnit::UNIT);
    _remote_read_counter = ADD_COUNTER(_profile, "FileRemoteReadCalls", TUnit::UNIT);
    _remote_read_bytes = ADD_COUNTER(_profile, "FileRemoteReadBytes", TUnit::BYTES);
    _remote_read_rate = _profile->add_derived_counter(
            "FileRemoteReadRate", TUnit::BYTES_PER_SECOND,
            std::bind<int64_t>(&RuntimeProfile::units_per_second, _remote_read_bytes,
                               _remote_read_timer),
            "");

    RETURN_IF_ERROR(_reader->open());
    return Status::OK();
}

//not support
Status BufferedReader::read_one_message(std::unique_ptr<uint8_t[]>* buf, int64_t* length) {
    return Status::NotSupported("Not support");
}

Status BufferedReader::read(uint8_t* buf, int64_t buf_len, int64_t* bytes_read, bool* eof) {
    DCHECK_NE(buf_len, 0);
    RETURN_IF_ERROR(readat(_cur_offset, buf_len, bytes_read, buf));
    if (*bytes_read == 0) {
        *eof = true;
    } else {
        *eof = false;
    }
    return Status::OK();
}

Status BufferedReader::readat(int64_t position, int64_t nbytes, int64_t* bytes_read, void* out) {
    SCOPED_TIMER(_read_timer);
    if (nbytes <= 0) {
        *bytes_read = 0;
        return Status::OK();
    }
    RETURN_IF_ERROR(_read_once(position, nbytes, bytes_read, out));
    //EOF
    if (*bytes_read <= 0) {
        return Status::OK();
    }
    while (*bytes_read < nbytes) {
        int64_t len;
        RETURN_IF_ERROR(_read_once(position + *bytes_read, nbytes - *bytes_read, &len,
                                   reinterpret_cast<char*>(out) + *bytes_read));
        // EOF
        if (len <= 0) {
            break;
        }
        *bytes_read += len;
    }
    return Status::OK();
}

Status BufferedReader::_read_once(int64_t position, int64_t nbytes, int64_t* bytes_read,
                                  void* out) {
    ++_read_count;
    // requested bytes missed the local buffer
    if (position >= _buffer_limit || position < _buffer_offset) {
        // if requested length is larger than the capacity of buffer, do not
        // need to copy the character into local buffer.
        if (nbytes > _buffer_size) {
            auto st = _reader->readat(position, nbytes, bytes_read, out);
            if (st.ok()) {
                _cur_offset = position + *bytes_read;
                ++_remote_read_count;
                _remote_bytes += *bytes_read;
            }
            return st;
        }
        _buffer_offset = position;
        RETURN_IF_ERROR(_fill());
        if (position >= _buffer_limit) {
            *bytes_read = 0;
            return Status::OK();
        }
    }
    int64_t len = std::min(_buffer_limit - position, nbytes);
    int64_t off = position - _buffer_offset;
    memcpy(out, _buffer + off, len);
    *bytes_read = len;
    _cur_offset = position + *bytes_read;
    return Status::OK();
}

Status BufferedReader::_fill() {
    if (_buffer_offset >= 0) {
        int64_t bytes_read = 0;
        SCOPED_TIMER(_remote_read_timer);
        RETURN_IF_ERROR(_reader->readat(_buffer_offset, _buffer_size, &bytes_read, _buffer));
        _buffer_limit = _buffer_offset + bytes_read;
        ++_remote_read_count;
        _remote_bytes += bytes_read;
    }
    return Status::OK();
}

int64_t BufferedReader::size() {
    return _reader->size();
}

Status BufferedReader::seek(int64_t position) {
    _cur_offset = position;
    return Status::OK();
}

Status BufferedReader::tell(int64_t* position) {
    *position = _cur_offset;
    return Status::OK();
}

void BufferedReader::close() {
    _reader->close();
    SAFE_DELETE_ARRAY(_buffer);

    if (_read_counter != nullptr) {
        COUNTER_UPDATE(_read_counter, _read_count);
    }
    if (_remote_read_counter != nullptr) {
        COUNTER_UPDATE(_remote_read_counter, _remote_read_count);
    }
    if (_remote_read_bytes != nullptr) {
        COUNTER_UPDATE(_remote_read_bytes, _remote_bytes);
    }
}

bool BufferedReader::closed() {
    return _reader->closed();
}

BufferedFileStreamReader::BufferedFileStreamReader(FileReader* file, uint64_t offset,
                                                   uint64_t length)
        : _file(file), _file_start_offset(offset), _file_end_offset(offset + length) {}

Status BufferedFileStreamReader::seek(uint64_t position) {
    if (_file_position != position) {
        RETURN_IF_ERROR(_file->seek(position));
        _file_position = position;
    }
    return Status::OK();
}

Status BufferedFileStreamReader::read_bytes(const uint8_t** buf, uint64_t offset,
                                            size_t* bytes_to_read) {
    if (offset < _file_start_offset) {
        return Status::IOError("Out-of-bounds Access");
    }
    if (offset >= _file_end_offset) {
        *bytes_to_read = 0;
        return Status::OK();
    }
    int64_t end_offset = offset + *bytes_to_read;
    if (_buf_start_offset <= offset && _buf_end_offset >= end_offset) {
        *buf = _buf.get() + offset - _buf_start_offset;
        return Status::OK();
    }
    if (_buf_size < *bytes_to_read) {
        size_t new_size = BitUtil::next_power_of_two(*bytes_to_read);
        std::unique_ptr<uint8_t[]> new_buf(new uint8_t[new_size]);
        if (offset >= _buf_start_offset && offset < _buf_end_offset) {
            memcpy(new_buf.get(), _buf.get() + offset - _buf_start_offset,
                   _buf_end_offset - offset);
        }
        _buf = std::move(new_buf);
        _buf_size = new_size;
    } else if (offset > _buf_start_offset && offset < _buf_end_offset) {
        memmove(_buf.get(), _buf.get() + offset - _buf_start_offset, _buf_end_offset - offset);
    }
    if (offset < _buf_start_offset || offset >= _buf_end_offset) {
        _buf_end_offset = offset;
    }
    _buf_start_offset = offset;
    int64_t to_read = end_offset - _buf_end_offset;
    RETURN_IF_ERROR(seek(_buf_end_offset));
    bool eof = false;
    int64_t buf_remaining = _buf_end_offset - _buf_start_offset;
    RETURN_IF_ERROR(_file->read(_buf.get() + buf_remaining, to_read, &to_read, &eof));
    *bytes_to_read = buf_remaining + to_read;
    _buf_end_offset += to_read;
    *buf = _buf.get();
    return Status::OK();
}

Status BufferedFileStreamReader::read_bytes(Slice& slice, uint64_t offset) {
    return read_bytes((const uint8_t**)&slice.data, offset, &slice.size);
}

} // namespace doris
